package com.atguigu.flink.wordcount;

import com.atguigu.flink.pojo.WordCount;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Created by Smexy on 2023/11/7
 *

    为什么使用Bean而不用Tuple
        1)从kafka消费的数据都是json，json可以直接使用JSON处理工具，转换为Bean
            无法转换为Tuple
        2）Tuple集合提供的存储字段的数量是有限的，最多只能达到25个。
                如果业务复杂，要解析的字段多，Tuple存不下
        3) Tuple集合存储数据，在后续的处理中可读性差
                例如： keyBy(Tuple.f0)
                      sum(1)

    Bean在Flink中也称为POJO(plain old java object)
        POJO: 传统java对象




 */
public class Demo5_Pojo
{
    public static void main(String[] args) throws Exception {

        System.out.println("main执行了....");

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        //无界流
        DataStreamSource<String> dataStreamSource = environment.socketTextStream("hadoop102", 8888);

        dataStreamSource.
            flatMap(new FlatMapFunction<String, WordCount>()
            {
                @Override
                public void flatMap(String value, Collector<WordCount> out) throws Exception {
                    String[] words = value.split(" ");
                    for (String word : words) {
                        out.collect(new WordCount(word,1));
                    }
                }
            })
            .keyBy(new KeySelector<WordCount, String>()
            {
                @Override
                public String getKey(WordCount value) throws Exception {
                    return value.getWord();
                }
            })
            // sum()运算上游发送的数据类型是pojo，此时sum(POJO类的字段名)
            .sum("count")
            .print();

        environment.execute();



    }
}
